From 4657d174aba23168b14408dc59a3ec181154b833 Mon Sep 17 00:00:00 2001 From: "kaf24@firebug.cl.cam.ac.uk" Date: Sun, 9 Oct 2005 18:52:54 +0100 Subject: [PATCH] Refactor xenbus to break up the xenbus_lock and permit watches to fire concurrently with request/reply pairs. Remove watch_ack message: no longer needed. Signed-off-by: Keir Fraser --- .../arch/xen/i386/kernel/smpboot.c | 8 +- linux-2.6-xen-sparse/arch/xen/kernel/reboot.c | 5 - .../drivers/xen/balloon/balloon.c | 5 - .../drivers/xen/xenbus/xenbus_comms.c | 22 +- .../drivers/xen/xenbus/xenbus_dev.c | 150 +++--- .../drivers/xen/xenbus/xenbus_probe.c | 7 +- .../drivers/xen/xenbus/xenbus_xs.c | 450 +++++++++++++----- linux-2.6-xen-sparse/include/asm-xen/xenbus.h | 14 +- tools/blktap/xenbus.c | 4 - tools/console/daemon/io.c | 1 - tools/python/xen/lowlevel/xs/xs.c | 42 -- tools/python/xen/xend/xenstore/xswatch.py | 9 +- tools/xenstore/testsuite/07watch.test | 21 - tools/xenstore/testsuite/08transaction.test | 4 - .../xenstore/testsuite/10domain-homedir.test | 1 - tools/xenstore/testsuite/11domain-watch.test | 2 - tools/xenstore/testsuite/12readonly.test | 1 - tools/xenstore/testsuite/13watch-ack.test | 1 - tools/xenstore/xenstored_core.c | 11 - tools/xenstore/xenstored_core.h | 3 - tools/xenstore/xenstored_watch.c | 56 +-- tools/xenstore/xs.c | 49 +- tools/xenstore/xs.h | 6 - tools/xenstore/xs_test.c | 11 - xen/include/public/io/xs_wire.h | 2 - 25 files changed, 498 insertions(+), 387 deletions(-) diff --git a/linux-2.6-xen-sparse/arch/xen/i386/kernel/smpboot.c b/linux-2.6-xen-sparse/arch/xen/i386/kernel/smpboot.c index 79d5b93d46..c4bfaeacbd 100644 --- a/linux-2.6-xen-sparse/arch/xen/i386/kernel/smpboot.c +++ b/linux-2.6-xen-sparse/arch/xen/i386/kernel/smpboot.c @@ -1327,18 +1327,14 @@ static struct xenbus_watch cpu_watch = { .callback = handle_vcpu_hotplug_event }; -/* NB: Assumes xenbus_lock is held! */ static int setup_cpu_watcher(struct notifier_block *notifier, unsigned long event, void *data) { - int err = 0; + int err; - BUG_ON(down_trylock(&xenbus_lock) == 0); err = register_xenbus_watch(&cpu_watch); - - if (err) { + if (err) printk("Failed to register watch on /cpu\n"); - } return NOTIFY_DONE; } diff --git a/linux-2.6-xen-sparse/arch/xen/kernel/reboot.c b/linux-2.6-xen-sparse/arch/xen/kernel/reboot.c index b83f136f70..6094573a82 100644 --- a/linux-2.6-xen-sparse/arch/xen/kernel/reboot.c +++ b/linux-2.6-xen-sparse/arch/xen/kernel/reboot.c @@ -360,9 +360,6 @@ static struct xenbus_watch sysrq_watch = { static struct notifier_block xenstore_notifier; -/* Setup our watcher - NB: Assumes xenbus_lock is held! -*/ static int setup_shutdown_watcher(struct notifier_block *notifier, unsigned long event, void *data) @@ -372,8 +369,6 @@ static int setup_shutdown_watcher(struct notifier_block *notifier, int err2 = 0; #endif - BUG_ON(down_trylock(&xenbus_lock) == 0); - err1 = register_xenbus_watch(&shutdown_watch); #ifdef CONFIG_MAGIC_SYSRQ err2 = register_xenbus_watch(&sysrq_watch); diff --git a/linux-2.6-xen-sparse/drivers/xen/balloon/balloon.c b/linux-2.6-xen-sparse/drivers/xen/balloon/balloon.c index 119315c5e4..36c6286cd3 100644 --- a/linux-2.6-xen-sparse/drivers/xen/balloon/balloon.c +++ b/linux-2.6-xen-sparse/drivers/xen/balloon/balloon.c @@ -370,17 +370,12 @@ static void watch_target(struct xenbus_watch *watch, } -/* Setup our watcher - NB: Assumes xenbus_lock is held! -*/ int balloon_init_watcher(struct notifier_block *notifier, unsigned long event, void *data) { int err; - BUG_ON(down_trylock(&xenbus_lock) == 0); - err = register_xenbus_watch(&target_watch); if (err) printk(KERN_ERR "Failed to set balloon watcher\n"); diff --git a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_comms.c b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_comms.c index 7d0ef111bc..aa2e90b7cb 100644 --- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_comms.c +++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_comms.c @@ -130,15 +130,10 @@ int xb_write(const void *data, unsigned len) wait_event(xb_waitq, output_avail(out)); - /* Read, then check: not that we don't trust store. - * Hell, some of my best friends are daemons. But, - * in this post-911 world... */ - h = *out; mb(); - if (!check_buffer(&h)) { - set_current_state(TASK_RUNNING); - return -EIO; /* ETERRORIST! */ - } + h = *out; + if (!check_buffer(&h)) + return -EIO; dst = get_output_chunk(&h, out->buf, &avail); if (avail > len) @@ -173,12 +168,11 @@ int xb_read(void *data, unsigned len) const char *src; wait_event(xb_waitq, xs_input_avail()); - h = *in; + mb(); - if (!check_buffer(&h)) { - set_current_state(TASK_RUNNING); + h = *in; + if (!check_buffer(&h)) return -EIO; - } src = get_input_chunk(&h, in->buf, &avail); if (avail > len) @@ -195,10 +189,6 @@ int xb_read(void *data, unsigned len) notify_remote_via_evtchn(xen_start_info->store_evtchn); } - /* If we left something, wake watch thread to deal with it. */ - if (xs_input_avail()) - wake_up(&xb_waitq); - return 0; } diff --git a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c index a1d24be2b4..f4c0d35c3c 100644 --- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c +++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c @@ -46,85 +46,113 @@ #include struct xenbus_dev_data { - /* Are there bytes left to be read in this message? */ - int bytes_left; - /* Are we still waiting for the reply to a message we wrote? */ - int awaiting_reply; - /* Buffer for outgoing messages. */ + int in_transaction; + + /* Partial request. */ unsigned int len; union { struct xsd_sockmsg msg; char buffer[PAGE_SIZE]; } u; + + /* Response queue. */ +#define MASK_READ_IDX(idx) ((idx)&(PAGE_SIZE-1)) + char read_buffer[PAGE_SIZE]; + unsigned int read_cons, read_prod; + wait_queue_head_t read_waitq; }; static struct proc_dir_entry *xenbus_dev_intf; -/* Reply can be long (dir, getperm): don't buffer, just examine - * headers so we can discard rest if they die. */ static ssize_t xenbus_dev_read(struct file *filp, char __user *ubuf, size_t len, loff_t *ppos) { - struct xenbus_dev_data *data = filp->private_data; - struct xsd_sockmsg msg; - int err; - - /* Refill empty buffer? */ - if (data->bytes_left == 0) { - if (len < sizeof(msg)) - return -EINVAL; - - err = xb_read(&msg, sizeof(msg)); - if (err) - return err; - data->bytes_left = msg.len; - if (ubuf && copy_to_user(ubuf, &msg, sizeof(msg)) != 0) - return -EFAULT; - /* We can receive spurious XS_WATCH_EVENT messages. */ - if (msg.type != XS_WATCH_EVENT) - data->awaiting_reply = 0; - return sizeof(msg); + struct xenbus_dev_data *u = filp->private_data; + int i; + + if (wait_event_interruptible(u->read_waitq, + u->read_prod != u->read_cons)) + return -EINTR; + + for (i = 0; i < len; i++) { + if (u->read_cons == u->read_prod) + break; + put_user(u->read_buffer[MASK_READ_IDX(u->read_cons)], ubuf+i); + u->read_cons++; } - /* Don't read over next header, or over temporary buffer. */ - if (len > sizeof(data->u.buffer)) - len = sizeof(data->u.buffer); - if (len > data->bytes_left) - len = data->bytes_left; + return i; +} + +static void queue_reply(struct xenbus_dev_data *u, + char *data, unsigned int len) +{ + int i; - err = xb_read(data->u.buffer, len); - if (err) - return err; + for (i = 0; i < len; i++, u->read_prod++) + u->read_buffer[MASK_READ_IDX(u->read_prod)] = data[i]; - data->bytes_left -= len; - if (ubuf && copy_to_user(ubuf, data->u.buffer, len) != 0) - return -EFAULT; - return len; + BUG_ON((u->read_prod - u->read_cons) > sizeof(u->read_buffer)); + + wake_up(&u->read_waitq); } -/* We do v. basic sanity checking so they don't screw up kernel later. */ static ssize_t xenbus_dev_write(struct file *filp, const char __user *ubuf, size_t len, loff_t *ppos) { - struct xenbus_dev_data *data = filp->private_data; - int err; + struct xenbus_dev_data *u = filp->private_data; + void *reply; + int err = 0; - /* We gather data in buffer until we're ready to send it. */ - if (len > data->len + sizeof(data->u)) + if ((len + u->len) > sizeof(u->u.buffer)) return -EINVAL; - if (copy_from_user(data->u.buffer + data->len, ubuf, len) != 0) + + if (copy_from_user(u->u.buffer + u->len, ubuf, len) != 0) return -EFAULT; - data->len += len; - if (data->len >= sizeof(data->u.msg) + data->u.msg.len) { - err = xb_write(data->u.buffer, data->len); - if (err) - return err; - data->len = 0; - data->awaiting_reply = 1; + + u->len += len; + if (u->len < (sizeof(u->u.msg) + u->u.msg.len)) + return len; + + switch (u->u.msg.type) { + case XS_TRANSACTION_START: + case XS_TRANSACTION_END: + case XS_DIRECTORY: + case XS_READ: + case XS_GET_PERMS: + case XS_RELEASE: + case XS_GET_DOMAIN_PATH: + case XS_WRITE: + case XS_MKDIR: + case XS_RM: + case XS_SET_PERMS: + reply = xenbus_dev_request_and_reply(&u->u.msg); + if (IS_ERR(reply)) + err = PTR_ERR(reply); + else { + if (u->u.msg.type == XS_TRANSACTION_START) + u->in_transaction = 1; + if (u->u.msg.type == XS_TRANSACTION_END) + u->in_transaction = 0; + queue_reply(u, (char *)&u->u.msg, sizeof(u->u.msg)); + queue_reply(u, (char *)reply, u->u.msg.len); + kfree(reply); + } + break; + + default: + err = -EINVAL; + break; + } + + if (err == 0) { + u->len = 0; + err = len; } - return len; + + return err; } static int xenbus_dev_open(struct inode *inode, struct file *filp) @@ -134,7 +162,6 @@ static int xenbus_dev_open(struct inode *inode, struct file *filp) if (xen_start_info->store_evtchn == 0) return -ENOENT; - /* Don't try seeking. */ nonseekable_open(inode, filp); u = kmalloc(sizeof(*u), GFP_KERNEL); @@ -142,28 +169,21 @@ static int xenbus_dev_open(struct inode *inode, struct file *filp) return -ENOMEM; memset(u, 0, sizeof(*u)); + init_waitqueue_head(&u->read_waitq); filp->private_data = u; - down(&xenbus_lock); - return 0; } static int xenbus_dev_release(struct inode *inode, struct file *filp) { - struct xenbus_dev_data *data = filp->private_data; - - /* Discard any unread replies. */ - while (data->bytes_left || data->awaiting_reply) - xenbus_dev_read(filp, NULL, sizeof(data->u.buffer), NULL); - - /* Harmless if no transaction in progress. */ - xenbus_transaction_end(1); + struct xenbus_dev_data *u = filp->private_data; - up(&xenbus_lock); + if (u->in_transaction) + xenbus_transaction_end(1); - kfree(data); + kfree(u); return 0; } diff --git a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_probe.c b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_probe.c index e872df679c..a320bf36ae 100644 --- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_probe.c +++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_probe.c @@ -43,6 +43,9 @@ static struct notifier_block *xenstore_chain; +/* Now used to protect xenbus probes against save/restore. */ +static DECLARE_MUTEX(xenbus_lock); + /* If something in array of ids matches this device, return it. */ static const struct xenbus_device_id * match_device(const struct xenbus_device_id *arr, struct xenbus_device *dev) @@ -625,12 +628,13 @@ void xenbus_suspend(void) down(&xenbus_lock); bus_for_each_dev(&xenbus_frontend.bus, NULL, NULL, suspend_dev); bus_for_each_dev(&xenbus_backend.bus, NULL, NULL, suspend_dev); + xs_suspend(); } void xenbus_resume(void) { xb_init_comms(); - reregister_xenbus_watches(); + xs_resume(); bus_for_each_dev(&xenbus_frontend.bus, NULL, NULL, resume_dev); bus_for_each_dev(&xenbus_backend.bus, NULL, NULL, resume_dev); up(&xenbus_lock); @@ -685,6 +689,7 @@ int do_xenbus_probe(void *unused) /* Notify others that xenstore is up */ notifier_call_chain(&xenstore_chain, 0, 0); up(&xenbus_lock); + return 0; } diff --git a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c index 744576c02f..8e52045853 100644 --- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c +++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c @@ -42,11 +42,67 @@ #define streq(a, b) (strcmp((a), (b)) == 0) -static char printf_buffer[4096]; +struct xs_stored_msg { + struct xsd_sockmsg hdr; + + union { + /* Stored replies. */ + struct { + struct list_head list; + char *body; + } reply; + + /* Queued watch callbacks. */ + struct { + struct work_struct work; + struct xenbus_watch *handle; + char **vec; + unsigned int vec_size; + } watch; + } u; +}; + +struct xs_handle { + /* A list of replies. Currently only one will ever be outstanding. */ + struct list_head reply_list; + spinlock_t reply_lock; + wait_queue_head_t reply_waitq; + + /* One request at a time. */ + struct semaphore request_mutex; + + /* One transaction at a time. */ + struct semaphore transaction_mutex; + int transaction_pid; +}; + +static struct xs_handle xs_state; + static LIST_HEAD(watches); +static DEFINE_SPINLOCK(watches_lock); -DECLARE_MUTEX(xenbus_lock); -EXPORT_SYMBOL(xenbus_lock); +/* Can wait on !xs_resuming for suspend/resume cycle to complete. */ +static int xs_resuming; +static DECLARE_WAIT_QUEUE_HEAD(xs_resuming_waitq); + +static void request_mutex_acquire(void) +{ + /* + * We can't distinguish non-transactional from transactional + * requests right now. So temporarily acquire the transaction mutex + * if this task is outside transaction context. + */ + if (xs_state.transaction_pid != current->pid) + down(&xs_state.transaction_mutex); + down(&xs_state.request_mutex); +} + +static void request_mutex_release(void) +{ + up(&xs_state.request_mutex); + if (xs_state.transaction_pid != current->pid) + up(&xs_state.transaction_mutex); +} static int get_error(const char *errorstring) { @@ -65,29 +121,32 @@ static int get_error(const char *errorstring) static void *read_reply(enum xsd_sockmsg_type *type, unsigned int *len) { - struct xsd_sockmsg msg; - void *ret; - int err; - - err = xb_read(&msg, sizeof(msg)); - if (err) - return ERR_PTR(err); + struct xs_stored_msg *msg; + char *body; - ret = kmalloc(msg.len + 1, GFP_KERNEL); - if (!ret) - return ERR_PTR(-ENOMEM); + spin_lock(&xs_state.reply_lock); - err = xb_read(ret, msg.len); - if (err) { - kfree(ret); - return ERR_PTR(err); + while (list_empty(&xs_state.reply_list)) { + spin_unlock(&xs_state.reply_lock); + wait_event(xs_state.reply_waitq, + !list_empty(&xs_state.reply_list)); + spin_lock(&xs_state.reply_lock); } - ((char*)ret)[msg.len] = '\0'; - *type = msg.type; + msg = list_entry(xs_state.reply_list.next, + struct xs_stored_msg, u.reply.list); + list_del(&msg->u.reply.list); + + spin_unlock(&xs_state.reply_lock); + + *type = msg->hdr.type; if (len) - *len = msg.len; - return ret; + *len = msg->hdr.len; + body = msg->u.reply.body; + + kfree(msg); + + return body; } /* Emergency write. */ @@ -98,10 +157,45 @@ void xenbus_debug_write(const char *str, unsigned int count) msg.type = XS_DEBUG; msg.len = sizeof("print") + count + 1; + request_mutex_acquire(); xb_write(&msg, sizeof(msg)); xb_write("print", sizeof("print")); xb_write(str, count); xb_write("", 1); + request_mutex_release(); +} + +void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg) +{ + void *ret; + struct xsd_sockmsg req_msg = *msg; + int err; + + if (req_msg.type == XS_TRANSACTION_START) { + down(&xs_state.transaction_mutex); + xs_state.transaction_pid = current->pid; + } + + request_mutex_acquire(); + + err = xb_write(msg, sizeof(*msg) + msg->len); + if (err) { + msg->type = XS_ERROR; + ret = ERR_PTR(err); + } else { + ret = read_reply(&msg->type, &msg->len); + } + + request_mutex_release(); + + if ((msg->type == XS_TRANSACTION_END) || + ((req_msg.type == XS_TRANSACTION_START) && + (msg->type == XS_ERROR))) { + xs_state.transaction_pid = -1; + up(&xs_state.transaction_mutex); + } + + return ret; } /* Send message to xs, get kmalloc'ed reply. ERR_PTR() on error. */ @@ -115,31 +209,33 @@ static void *xs_talkv(enum xsd_sockmsg_type type, unsigned int i; int err; - WARN_ON(down_trylock(&xenbus_lock) == 0); - msg.type = type; msg.len = 0; for (i = 0; i < num_vecs; i++) msg.len += iovec[i].iov_len; + request_mutex_acquire(); + err = xb_write(&msg, sizeof(msg)); - if (err) + if (err) { + up(&xs_state.request_mutex); return ERR_PTR(err); + } for (i = 0; i < num_vecs; i++) { err = xb_write(iovec[i].iov_base, iovec[i].iov_len);; - if (err) + if (err) { + request_mutex_release(); return ERR_PTR(err); + } } - /* Watches can have fired before reply comes: daemon detects - * and re-transmits, so we can ignore this. */ - do { - kfree(ret); - ret = read_reply(&msg.type, len); - if (IS_ERR(ret)) - return ret; - } while (msg.type == XS_WATCH_EVENT); + ret = read_reply(&msg.type, len); + + request_mutex_release(); + + if (IS_ERR(ret)) + return ret; if (msg.type == XS_ERROR) { err = get_error(ret); @@ -187,8 +283,6 @@ static char *join(const char *dir, const char *name) { static char buffer[4096]; - BUG_ON(down_trylock(&xenbus_lock) == 0); - /* XXX FIXME: might not be correct if name == "" */ BUG_ON(strlen(dir) + strlen("/") + strlen(name) + 1 > sizeof(buffer)); strcpy(buffer, dir); @@ -207,7 +301,7 @@ static char **split(char *strings, unsigned int len, unsigned int *num) *num = count_strings(strings, len); /* Transfer to one big alloc for easy freeing. */ - ret = kmalloc(*num * sizeof(char *) + len, GFP_ATOMIC); + ret = kmalloc(*num * sizeof(char *) + len, GFP_KERNEL); if (!ret) { kfree(strings); return ERR_PTR(-ENOMEM); @@ -298,7 +392,18 @@ EXPORT_SYMBOL(xenbus_rm); */ int xenbus_transaction_start(void) { - return xs_error(xs_single(XS_TRANSACTION_START, "", NULL)); + int err; + + down(&xs_state.transaction_mutex); + xs_state.transaction_pid = current->pid; + + err = xs_error(xs_single(XS_TRANSACTION_START, "", NULL)); + if (err) { + xs_state.transaction_pid = -1; + up(&xs_state.transaction_mutex); + } + + return err; } EXPORT_SYMBOL(xenbus_transaction_start); @@ -308,12 +413,19 @@ EXPORT_SYMBOL(xenbus_transaction_start); int xenbus_transaction_end(int abort) { char abortstr[2]; + int err; if (abort) strcpy(abortstr, "F"); else strcpy(abortstr, "T"); - return xs_error(xs_single(XS_TRANSACTION_END, abortstr, NULL)); + + err = xs_error(xs_single(XS_TRANSACTION_END, abortstr, NULL)); + + xs_state.transaction_pid = -1; + up(&xs_state.transaction_mutex); + + return err; } EXPORT_SYMBOL(xenbus_transaction_end); @@ -344,14 +456,23 @@ int xenbus_printf(const char *dir, const char *node, const char *fmt, ...) { va_list ap; int ret; +#define PRINTF_BUFFER_SIZE 4096 + char *printf_buffer; + + printf_buffer = kmalloc(PRINTF_BUFFER_SIZE, GFP_KERNEL); + if (printf_buffer == NULL) + return -ENOMEM; - BUG_ON(down_trylock(&xenbus_lock) == 0); va_start(ap, fmt); - ret = vsnprintf(printf_buffer, sizeof(printf_buffer), fmt, ap); + ret = vsnprintf(printf_buffer, PRINTF_BUFFER_SIZE, fmt, ap); va_end(ap); - BUG_ON(ret > sizeof(printf_buffer)-1); - return xenbus_write(dir, node, printf_buffer); + BUG_ON(ret > PRINTF_BUFFER_SIZE-1); + ret = xenbus_write(dir, node, printf_buffer); + + kfree(printf_buffer); + + return ret; } EXPORT_SYMBOL(xenbus_printf); @@ -361,19 +482,28 @@ void xenbus_dev_error(struct xenbus_device *dev, int err, const char *fmt, ...) va_list ap; int ret; unsigned int len; + char *printf_buffer; - BUG_ON(down_trylock(&xenbus_lock) == 0); + printf_buffer = kmalloc(PRINTF_BUFFER_SIZE, GFP_KERNEL); + if (printf_buffer == NULL) + goto fail; len = sprintf(printf_buffer, "%i ", -err); va_start(ap, fmt); - ret = vsnprintf(printf_buffer+len, sizeof(printf_buffer)-len, fmt, ap); + ret = vsnprintf(printf_buffer+len, PRINTF_BUFFER_SIZE-len, fmt, ap); va_end(ap); - BUG_ON(len + ret > sizeof(printf_buffer)-1); + BUG_ON(len + ret > PRINTF_BUFFER_SIZE-1); dev->has_error = 1; if (xenbus_write(dev->nodename, "error", printf_buffer) != 0) - printk("xenbus: failed to write error node for %s (%s)\n", - dev->nodename, printf_buffer); + goto fail; + + kfree(printf_buffer); + return; + + fail: + printk("xenbus: failed to write error node for %s (%s)\n", + dev->nodename, printf_buffer); } EXPORT_SYMBOL(xenbus_dev_error); @@ -432,26 +562,6 @@ static int xs_watch(const char *path, const char *token) return xs_error(xs_talkv(XS_WATCH, iov, ARRAY_SIZE(iov), NULL)); } -static char **xs_read_watch(unsigned int *num) -{ - enum xsd_sockmsg_type type; - char *strings; - unsigned int len; - - strings = read_reply(&type, &len); - if (IS_ERR(strings)) - return (char **)strings; - - BUG_ON(type != XS_WATCH_EVENT); - - return split(strings, len, num); -} - -static int xs_acknowledge_watch(const char *token) -{ - return xs_error(xs_single(XS_WATCH_ACK, token, NULL)); -} - static int xs_unwatch(const char *path, const char *token) { struct kvec iov[2]; @@ -464,7 +574,6 @@ static int xs_unwatch(const char *path, const char *token) return xs_error(xs_talkv(XS_UNWATCH, iov, ARRAY_SIZE(iov), NULL)); } -/* A little paranoia: we don't just trust token. */ static struct xenbus_watch *find_watch(const char *token) { struct xenbus_watch *i, *cmp; @@ -474,6 +583,7 @@ static struct xenbus_watch *find_watch(const char *token) list_for_each_entry(i, &watches, list) if (i == cmp) return i; + return NULL; } @@ -485,11 +595,20 @@ int register_xenbus_watch(struct xenbus_watch *watch) int err; sprintf(token, "%lX", (long)watch); + + spin_lock(&watches_lock); BUG_ON(find_watch(token)); + spin_unlock(&watches_lock); err = xs_watch(watch->node, token); - if (!err) + + /* Ignore errors due to multiple registration. */ + if ((err == 0) || (err == -EEXIST)) { + spin_lock(&watches_lock); list_add(&watch->list, &watches); + spin_unlock(&watches_lock); + } + return err; } EXPORT_SYMBOL(register_xenbus_watch); @@ -500,77 +619,188 @@ void unregister_xenbus_watch(struct xenbus_watch *watch) int err; sprintf(token, "%lX", (long)watch); - BUG_ON(!find_watch(token)); - err = xs_unwatch(watch->node, token); + spin_lock(&watches_lock); + BUG_ON(!find_watch(token)); list_del(&watch->list); + spin_unlock(&watches_lock); + + /* Ensure xs_resume() is not in progress (see comments there). */ + wait_event(xs_resuming_waitq, !xs_resuming); + err = xs_unwatch(watch->node, token); if (err) printk(KERN_WARNING "XENBUS Failed to release watch %s: %i\n", watch->node, err); + + /* Make sure watch is not in use. */ + flush_scheduled_work(); } EXPORT_SYMBOL(unregister_xenbus_watch); -/* Re-register callbacks to all watches. */ -void reregister_xenbus_watches(void) +void xs_suspend(void) { + down(&xs_state.transaction_mutex); + down(&xs_state.request_mutex); +} + +void xs_resume(void) +{ + struct list_head *ent, *prev_ent = &watches; struct xenbus_watch *watch; char token[sizeof(watch) * 2 + 1]; - list_for_each_entry(watch, &watches, list) { - sprintf(token, "%lX", (long)watch); - xs_watch(watch->node, token); + /* Protect against concurrent unregistration and freeing of watches. */ + BUG_ON(xs_resuming); + xs_resuming = 1; + + up(&xs_state.request_mutex); + up(&xs_state.transaction_mutex); + + /* + * Iterate over the watch list re-registering each node. We must + * be careful about concurrent registrations and unregistrations. + * We search for the node immediately following the previously + * re-registered node. If we get no match then either we are done + * (previous node is last in list) or the node was unregistered, in + * which case we restart from the beginning of the list. + * register_xenbus_watch() + unregister_xenbus_watch() is safe because + * it will only ever move a watch node earlier in the list, so it + * cannot cause us to skip nodes. + */ + for (;;) { + spin_lock(&watches_lock); + list_for_each(ent, &watches) + if (ent->prev == prev_ent) + break; + spin_unlock(&watches_lock); + + /* No match because prev_ent is at the end of the list? */ + if ((ent == &watches) && (watches.prev == prev_ent)) + break; /* We're done! */ + + if ((prev_ent = ent) != &watches) { + /* + * Safe even with watch_lock not held. We are saved by + * (xs_resumed==1) check in unregister_xenbus_watch. + */ + watch = list_entry(ent, struct xenbus_watch, list); + sprintf(token, "%lX", (long)watch); + xs_watch(watch->node, token); + } } + + xs_resuming = 0; + wake_up(&xs_resuming_waitq); } -static int watch_thread(void *unused) +static void xenbus_fire_watch(void *arg) { + struct xs_stored_msg *msg = arg; + + msg->u.watch.handle->callback(msg->u.watch.handle, + (const char **)msg->u.watch.vec, + msg->u.watch.vec_size); + + kfree(msg->u.watch.vec); + kfree(msg); +} + +static int process_msg(void) +{ + struct xs_stored_msg *msg; + char *body; + int err; + + msg = kmalloc(sizeof(*msg), GFP_KERNEL); + if (msg == NULL) + return -ENOMEM; + + err = xb_read(&msg->hdr, sizeof(msg->hdr)); + if (err) { + kfree(msg); + return err; + } + + body = kmalloc(msg->hdr.len + 1, GFP_KERNEL); + if (body == NULL) { + kfree(msg); + return -ENOMEM; + } + + err = xb_read(body, msg->hdr.len); + if (err) { + kfree(body); + kfree(msg); + return err; + } + body[msg->hdr.len] = '\0'; + + if (msg->hdr.type == XS_WATCH_EVENT) { + INIT_WORK(&msg->u.watch.work, xenbus_fire_watch, msg); + + msg->u.watch.vec = split(body, msg->hdr.len, + &msg->u.watch.vec_size); + if (IS_ERR(msg->u.watch.vec)) { + kfree(msg); + return PTR_ERR(msg->u.watch.vec); + } + + spin_lock(&watches_lock); + msg->u.watch.handle = find_watch( + msg->u.watch.vec[XS_WATCH_TOKEN]); + if (msg->u.watch.handle != NULL) { + schedule_work(&msg->u.watch.work); + } else { + kfree(msg->u.watch.vec); + kfree(msg); + } + spin_unlock(&watches_lock); + } else { + msg->u.reply.body = body; + spin_lock(&xs_state.reply_lock); + list_add_tail(&msg->u.reply.list, &xs_state.reply_list); + spin_unlock(&xs_state.reply_lock); + wake_up(&xs_state.reply_waitq); + } + + return 0; +} + +static int read_thread(void *unused) +{ + int err; + for (;;) { - char **vec = NULL; - unsigned int num; - - wait_event(xb_waitq, xs_input_avail()); - - /* If this is a spurious wakeup caused by someone - * doing an op, they'll hold the lock and the buffer - * will be empty by the time we get there. - */ - down(&xenbus_lock); - if (xs_input_avail()) - vec = xs_read_watch(&num); - - if (vec && !IS_ERR(vec)) { - struct xenbus_watch *w; - int err; - - err = xs_acknowledge_watch(vec[XS_WATCH_TOKEN]); - if (err) - printk(KERN_WARNING "XENBUS ack %s fail %i\n", - vec[XS_WATCH_TOKEN], err); - w = find_watch(vec[XS_WATCH_TOKEN]); - BUG_ON(!w); - w->callback(w, (const char **)vec, num); - kfree(vec); - } else if (vec) - printk(KERN_WARNING "XENBUS xs_read_watch: %li\n", - PTR_ERR(vec)); - up(&xenbus_lock); + err = process_msg(); + if (err) + printk(KERN_WARNING "XENBUS error %d while reading " + "message\n", err); } } int xs_init(void) { int err; - struct task_struct *watcher; + struct task_struct *reader; + + INIT_LIST_HEAD(&xs_state.reply_list); + spin_lock_init(&xs_state.reply_lock); + init_waitqueue_head(&xs_state.reply_waitq); + + init_MUTEX(&xs_state.request_mutex); + init_MUTEX(&xs_state.transaction_mutex); + xs_state.transaction_pid = -1; err = xb_init_comms(); if (err) return err; - watcher = kthread_run(watch_thread, NULL, "kxbwatch"); - if (IS_ERR(watcher)) - return PTR_ERR(watcher); + reader = kthread_run(read_thread, NULL, "xenbusd"); + if (IS_ERR(reader)) + return PTR_ERR(reader); + return 0; } diff --git a/linux-2.6-xen-sparse/include/asm-xen/xenbus.h b/linux-2.6-xen-sparse/include/asm-xen/xenbus.h index 219cc96b4f..3db3254d96 100644 --- a/linux-2.6-xen-sparse/include/asm-xen/xenbus.h +++ b/linux-2.6-xen-sparse/include/asm-xen/xenbus.h @@ -78,10 +78,6 @@ int xenbus_register_driver(struct xenbus_driver *drv); int xenbus_register_backend(struct xenbus_driver *drv); void xenbus_unregister_driver(struct xenbus_driver *drv); -/* Caller must hold this lock to call these functions: it's also held - * across watch callbacks. */ -extern struct semaphore xenbus_lock; - char **xenbus_directory(const char *dir, const char *node, unsigned int *num); void *xenbus_read(const char *dir, const char *node, unsigned int *len); int xenbus_write(const char *dir, const char *node, const char *string); @@ -113,7 +109,11 @@ void xenbus_dev_ok(struct xenbus_device *dev); struct xenbus_watch { struct list_head list; + + /* Path being watched. */ char *node; + + /* Callback (executed in a process context with no locks held). */ void (*callback)(struct xenbus_watch *, const char **vec, unsigned int len); }; @@ -124,7 +124,11 @@ void unregister_xenstore_notifier(struct notifier_block *nb); int register_xenbus_watch(struct xenbus_watch *watch); void unregister_xenbus_watch(struct xenbus_watch *watch); -void reregister_xenbus_watches(void); +void xs_suspend(void); +void xs_resume(void); + +/* Used by xenbus_dev to borrow kernel's store connection. */ +void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg); /* Called from xen core code. */ void xenbus_suspend(void); diff --git a/tools/blktap/xenbus.c b/tools/blktap/xenbus.c index c9cbea5fb3..99ba9ea49e 100644 --- a/tools/blktap/xenbus.c +++ b/tools/blktap/xenbus.c @@ -260,10 +260,6 @@ int xs_fire_next_watch(struct xs_handle *h) node = res[XS_WATCH_PATH]; token = res[XS_WATCH_TOKEN]; - er = xs_acknowledge_watch(h, token); - if (er == 0) - warn("Couldn't acknowledge watch (%s)", token); - w = find_watch(token); if (!w) { diff --git a/tools/console/daemon/io.c b/tools/console/daemon/io.c index 24afe0b513..d533bd9b5c 100644 --- a/tools/console/daemon/io.c +++ b/tools/console/daemon/io.c @@ -505,7 +505,6 @@ static void handle_xs(int fd) domain_create_ring(dom); } - xs_acknowledge_watch(xs, vec[1]); free(vec); } diff --git a/tools/python/xen/lowlevel/xs/xs.c b/tools/python/xen/lowlevel/xs/xs.c index 7389ea20ff..9c244d195a 100644 --- a/tools/python/xen/lowlevel/xs/xs.c +++ b/tools/python/xen/lowlevel/xs/xs.c @@ -442,9 +442,6 @@ static PyObject *xspy_watch(PyObject *self, PyObject *args, PyObject *kwds) #define xspy_read_watch_doc "\n" \ "Read a watch notification.\n" \ - "The notification must be acknowledged by passing\n" \ - "the token to acknowledge_watch().\n" \ - " path [string]: xenstore path.\n" \ "\n" \ "Returns: [tuple] (path, token).\n" \ "Raises RuntimeError on error.\n" \ @@ -495,44 +492,6 @@ static PyObject *xspy_read_watch(PyObject *self, PyObject *args, return val; } -#define xspy_acknowledge_watch_doc "\n" \ - "Acknowledge a watch notification that has been read.\n" \ - " token [string] : from the watch notification\n" \ - "\n" \ - "Returns None on success.\n" \ - "Raises RuntimeError on error.\n" \ - "\n" - -static PyObject *xspy_acknowledge_watch(PyObject *self, PyObject *args, - PyObject *kwds) -{ - static char *kwd_spec[] = { "token", NULL }; - static char *arg_spec = "O"; - PyObject *token; - char token_str[MAX_STRLEN(unsigned long) + 1]; - - struct xs_handle *xh = xshandle(self); - PyObject *val = NULL; - int xsval = 0; - - if (!xh) - goto exit; - if (!PyArg_ParseTupleAndKeywords(args, kwds, arg_spec, kwd_spec, &token)) - goto exit; - sprintf(token_str, "%li", (unsigned long)token); - Py_BEGIN_ALLOW_THREADS - xsval = xs_acknowledge_watch(xh, token_str); - Py_END_ALLOW_THREADS - if (!xsval) { - PyErr_SetFromErrno(PyExc_RuntimeError); - goto exit; - } - Py_INCREF(Py_None); - val = Py_None; - exit: - return val; -} - #define xspy_unwatch_doc "\n" \ "Stop watching a path.\n" \ " path [string] : xenstore path.\n" \ @@ -833,7 +792,6 @@ static PyMethodDef xshandle_methods[] = { XSPY_METH(set_permissions), XSPY_METH(watch), XSPY_METH(read_watch), - XSPY_METH(acknowledge_watch), XSPY_METH(unwatch), XSPY_METH(transaction_start), XSPY_METH(transaction_end), diff --git a/tools/python/xen/xend/xenstore/xswatch.py b/tools/python/xen/xend/xenstore/xswatch.py index 8baf36e430..8a2da8d12c 100644 --- a/tools/python/xen/xend/xenstore/xswatch.py +++ b/tools/python/xen/xend/xenstore/xswatch.py @@ -8,6 +8,7 @@ import select import threading from xen.lowlevel import xs +from xen.xend.xenstore.xsutil import xshandle class xswatch: @@ -27,10 +28,7 @@ class xswatch: if cls.watchThread: cls.xslock.release() return - # XXX: When we fix xenstored to have better watch semantics, - # this can change to shared xshandle(). Currently that would result - # in duplicate watch firings, thus failed extra xs.acknowledge_watch. - cls.xs = xs.open() + cls.xs = xshandle() cls.watchThread = threading.Thread(name="Watcher", target=cls.watchMain) cls.watchThread.setDaemon(True) @@ -43,11 +41,10 @@ class xswatch: while True: try: we = cls.xs.read_watch() - watch = we[1] - cls.xs.acknowledge_watch(watch) except RuntimeError, ex: print ex raise + watch = we[1] watch.fn(*watch.args, **watch.kwargs) watchMain = classmethod(watchMain) diff --git a/tools/xenstore/testsuite/07watch.test b/tools/xenstore/testsuite/07watch.test index 5ae025765e..b4a44adebc 100644 --- a/tools/xenstore/testsuite/07watch.test +++ b/tools/xenstore/testsuite/07watch.test @@ -5,7 +5,6 @@ write /test contents 2 write /test contents2 expect 1:/test:token 1 waitwatch -1 ackwatch token 1 close # Check that reads don't set it off. @@ -22,15 +21,12 @@ mkdir /dir 2 mkdir /dir/newdir expect 1:/dir/newdir:token 1 waitwatch -1 ackwatch token 2 setperm /dir/newdir 0 READ expect 1:/dir/newdir:token 1 waitwatch -1 ackwatch token 2 rm /dir/newdir expect 1:/dir/newdir:token 1 waitwatch -1 ackwatch token 1 close 2 close @@ -49,7 +45,6 @@ expect contents read /dir/test expect /dir/test:token waitwatch -ackwatch token close # watch priority test: all simultaneous @@ -59,13 +54,10 @@ close write /dir/test contents expect 3:/dir/test:token3 3 waitwatch -3 ackwatch token3 expect 2:/dir/test:token2 2 waitwatch -2 ackwatch token2 expect 1:/dir/test:token1 1 waitwatch -1 ackwatch token1 1 close 2 close 3 close @@ -79,7 +71,6 @@ expect 2:/dir/test:token2 2 close expect 1:/dir/test:token1 1 waitwatch -1 ackwatch token1 1 close # If one dies (without reading at all), the other should still get ack. @@ -89,7 +80,6 @@ write /dir/test contents 2 close expect 1:/dir/test:token1 1 waitwatch -1 ackwatch token1 1 close 2 close @@ -111,7 +101,6 @@ write /dir/test contents 2 unwatch /dir token2 expect 1:/dir/test:token1 1 waitwatch -1 ackwatch token1 1 close 2 close @@ -123,14 +112,12 @@ write /dir/test contents write /dir/test contents2 expect 1:/dir/test:token2 1 waitwatch -1 ackwatch token2 # check we only get notified once. 1 watch /test token 2 write /test contents2 expect 1:/test:token 1 waitwatch -1 ackwatch token expect 1: waitwatch failed: Connection timed out 1 waitwatch 1 close @@ -142,13 +129,10 @@ expect 1: waitwatch failed: Connection timed out 2 write /test3 contents expect 1:/test1:token 1 waitwatch -1 ackwatch token expect 1:/test2:token 1 waitwatch -1 ackwatch token expect 1:/test3:token 1 waitwatch -1 ackwatch token 1 close # Creation of subpaths should be covered correctly. @@ -157,10 +141,8 @@ expect 1:/test3:token 2 write /test/subnode/subnode contents2 expect 1:/test/subnode:token 1 waitwatch -1 ackwatch token expect 1:/test/subnode/subnode:token 1 waitwatch -1 ackwatch token expect 1: waitwatch failed: Connection timed out 1 waitwatch 1 close @@ -171,7 +153,6 @@ expect 1: waitwatch failed: Connection timed out 1 watchnoack / token2 0 expect 1:/test/subnode:token 1 waitwatch -1 ackwatch token expect 1:/:token2 1 waitwatch expect 1: waitwatch failed: Connection timed out @@ -183,7 +164,6 @@ expect 1: waitwatch failed: Connection timed out 2 rm /test expect 1:/test/subnode:token 1 waitwatch -1 ackwatch token # Watch should not double-send after we ack, even if we did something in between. 1 watch /test2 token @@ -192,6 +172,5 @@ expect 1:/test2/foo:token 1 waitwatch expect 1:contents2 1 read /test2/foo -1 ackwatch token expect 1: waitwatch failed: Connection timed out 1 waitwatch diff --git a/tools/xenstore/testsuite/08transaction.test b/tools/xenstore/testsuite/08transaction.test index 558e06b9bc..c65fb647fa 100644 --- a/tools/xenstore/testsuite/08transaction.test +++ b/tools/xenstore/testsuite/08transaction.test @@ -68,7 +68,6 @@ expect 1: waitwatch failed: Connection timed out 2 commit expect 1:/test/dir/sub:token 1 waitwatch -1 ackwatch token 1 close # Rm inside transaction works like rm outside: children get notified. @@ -78,7 +77,6 @@ expect 1:/test/dir/sub:token 2 commit expect 1:/test/dir/sub:token 1 waitwatch -1 ackwatch token 1 close # Multiple events from single transaction don't trigger assert @@ -89,8 +87,6 @@ expect 1:/test/dir/sub:token 2 commit expect 1:/test/1:token 1 waitwatch -1 ackwatch token expect 1:/test/2:token 1 waitwatch -1 ackwatch token 1 close diff --git a/tools/xenstore/testsuite/10domain-homedir.test b/tools/xenstore/testsuite/10domain-homedir.test index 077606c0a5..88e4672909 100644 --- a/tools/xenstore/testsuite/10domain-homedir.test +++ b/tools/xenstore/testsuite/10domain-homedir.test @@ -16,4 +16,3 @@ dir /home write /home/foo/bar contents expect 1:foo/bar:token 1 waitwatch -1 ackwatch token diff --git a/tools/xenstore/testsuite/11domain-watch.test b/tools/xenstore/testsuite/11domain-watch.test index 159a85402e..3b28ee3094 100644 --- a/tools/xenstore/testsuite/11domain-watch.test +++ b/tools/xenstore/testsuite/11domain-watch.test @@ -10,7 +10,6 @@ introduce 1 100 7 /my/home write /test contents2 expect 1:/test:token 1 waitwatch -1 ackwatch token 1 unwatch /test token release 1 1 close @@ -25,7 +24,6 @@ write /dir/test contents 1 write /dir/test4 contents4 expect 1:/dir/test:token 1 waitwatch -1 ackwatch token release 1 1 close diff --git a/tools/xenstore/testsuite/12readonly.test b/tools/xenstore/testsuite/12readonly.test index 04331fab9d..14878e7dc1 100644 --- a/tools/xenstore/testsuite/12readonly.test +++ b/tools/xenstore/testsuite/12readonly.test @@ -36,4 +36,3 @@ watch / token 1 write /test contents expect /test:token waitwatch -ackwatch token diff --git a/tools/xenstore/testsuite/13watch-ack.test b/tools/xenstore/testsuite/13watch-ack.test index 41f2443aa0..e15f2b85e3 100644 --- a/tools/xenstore/testsuite/13watch-ack.test +++ b/tools/xenstore/testsuite/13watch-ack.test @@ -18,5 +18,4 @@ expect 1:/test/2:token2 1 waitwatch 3 write /test/1 contents1 4 write /test/3 contents3 -1 ackwatch token2 1 close diff --git a/tools/xenstore/xenstored_core.c b/tools/xenstore/xenstored_core.c index 2061721547..25c9b03bb8 100644 --- a/tools/xenstore/xenstored_core.c +++ b/tools/xenstore/xenstored_core.c @@ -154,7 +154,6 @@ static char *sockmsg_string(enum xsd_sockmsg_type type) case XS_READ: return "READ"; case XS_GET_PERMS: return "GET_PERMS"; case XS_WATCH: return "WATCH"; - case XS_WATCH_ACK: return "WATCH_ACK"; case XS_UNWATCH: return "UNWATCH"; case XS_TRANSACTION_START: return "TRANSACTION_START"; case XS_TRANSACTION_END: return "TRANSACTION_END"; @@ -1103,10 +1102,6 @@ static void process_message(struct connection *conn, struct buffered_data *in) do_watch(conn, in); break; - case XS_WATCH_ACK: - do_watch_ack(conn, onearg(in)); - break; - case XS_UNWATCH: do_unwatch(conn, in); break; @@ -1168,11 +1163,6 @@ static void consider_message(struct connection *conn) xprintf("Got message %s len %i from %p\n", sockmsg_string(type), conn->in->hdr.msg.len, conn); - /* We might get a command while waiting for an ack: this means - * the other end discarded it: we will re-transmit. */ - if (type != XS_WATCH_ACK) - conn->waiting_for_ack = NULL; - /* Careful: process_message may free connection. We detach * "in" beforehand and allocate the new buffer to avoid * touching conn after process_message. @@ -1266,7 +1256,6 @@ struct connection *new_connection(connwritefn_t *write, connreadfn_t *read) new->state = OK; new->out = new->waiting_reply = NULL; - new->waiting_for_ack = NULL; new->fd = -1; new->id = 0; new->domain = NULL; diff --git a/tools/xenstore/xenstored_core.h b/tools/xenstore/xenstored_core.h index e0e4d58886..e017bddf42 100644 --- a/tools/xenstore/xenstored_core.h +++ b/tools/xenstore/xenstored_core.h @@ -71,9 +71,6 @@ struct connection /* Is this a read-only connection? */ bool can_write; - /* Are we waiting for a watch event ack? */ - struct watch *waiting_for_ack; - /* Buffered incoming data. */ struct buffered_data *in; diff --git a/tools/xenstore/xenstored_watch.c b/tools/xenstore/xenstored_watch.c index 0f8bc6fbdf..dfb0cd1a3e 100644 --- a/tools/xenstore/xenstored_watch.c +++ b/tools/xenstore/xenstored_watch.c @@ -69,18 +69,14 @@ void queue_next_event(struct connection *conn) if (conn->waiting_reply) { conn->out = conn->waiting_reply; conn->waiting_reply = NULL; - conn->waiting_for_ack = NULL; return; } - /* If we're already waiting for ack, don't queue more. */ - if (conn->waiting_for_ack) - return; - list_for_each_entry(watch, &conn->watches, list) { event = list_top(&watch->events, struct watch_event, list); if (event) { - conn->waiting_for_ack = watch; + list_del(&event->list); + talloc_free(event); send_reply(conn,XS_WATCH_EVENT,event->data,event->len); break; } @@ -181,6 +177,15 @@ void do_watch(struct connection *conn, struct buffered_data *in) } } + /* Check for duplicates. */ + list_for_each_entry(watch, &conn->watches, list) { + if (streq(watch->node, vec[0]) && + streq(watch->token, vec[1])) { + send_error(conn, EEXIST); + return; + } + } + watch = talloc(conn, struct watch); watch->node = talloc_strdup(watch, vec[0]); watch->token = talloc_strdup(watch, vec[1]); @@ -200,37 +205,6 @@ void do_watch(struct connection *conn, struct buffered_data *in) add_event(conn, watch, watch->node); } -void do_watch_ack(struct connection *conn, const char *token) -{ - struct watch_event *event; - - if (!token) { - send_error(conn, EINVAL); - return; - } - - if (!conn->waiting_for_ack) { - send_error(conn, ENOENT); - return; - } - - if (!streq(conn->waiting_for_ack->token, token)) { - /* They're confused: this will cause us to send event again */ - conn->waiting_for_ack = NULL; - send_error(conn, EINVAL); - return; - } - - /* Remove event: after ack sent, core will call queue_next_event */ - event = list_top(&conn->waiting_for_ack->events, struct watch_event, - list); - list_del(&event->list); - talloc_free(event); - - conn->waiting_for_ack = NULL; - send_ack(conn, XS_WATCH_ACK); -} - void do_unwatch(struct connection *conn, struct buffered_data *in) { struct watch *watch; @@ -241,9 +215,6 @@ void do_unwatch(struct connection *conn, struct buffered_data *in) return; } - /* We don't need to worry if we're waiting for an ack for the - * watch we're deleting: conn->waiting_for_ack was reset by - * this command in consider_message anyway. */ node = canonicalize(conn, vec[0]); list_for_each_entry(watch, &conn->watches, list) { if (streq(watch->node, node) && streq(watch->token, vec[1])) { @@ -262,11 +233,6 @@ void dump_watches(struct connection *conn) struct watch *watch; struct watch_event *event; - if (conn->waiting_for_ack) - printf(" waiting_for_ack for watch on %s token %s\n", - conn->waiting_for_ack->node, - conn->waiting_for_ack->token); - list_for_each_entry(watch, &conn->watches, list) { printf(" watch on %s token %s\n", watch->node, watch->token); diff --git a/tools/xenstore/xs.c b/tools/xenstore/xs.c index 6dc4c4532c..1b648d2578 100644 --- a/tools/xenstore/xs.c +++ b/tools/xenstore/xs.c @@ -78,10 +78,30 @@ struct xs_handle { /* One transaction at a time. */ pthread_mutex_t transaction_mutex; + pthread_t transaction_pthread; }; static void *read_thread(void *arg); +static void request_mutex_acquire(struct xs_handle *h) +{ + /* + * We can't distinguish non-transactional from transactional + * requests right now. So temporarily acquire the transaction mutex + * if this task is outside transaction context. + */ + if (h->transaction_pthread != pthread_self()) + pthread_mutex_lock(&h->transaction_mutex); + pthread_mutex_lock(&h->request_mutex); +} + +static void request_mutex_release(struct xs_handle *h) +{ + pthread_mutex_unlock(&h->request_mutex); + if (h->transaction_pthread != pthread_self()) + pthread_mutex_unlock(&h->transaction_mutex); +} + int xs_fileno(struct xs_handle *h) { char c = 0; @@ -163,6 +183,7 @@ static struct xs_handle *get_handle(const char *connect_to) pthread_mutex_init(&h->request_mutex, NULL); pthread_mutex_init(&h->transaction_mutex, NULL); + h->transaction_pthread = -1; if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0) goto error; @@ -316,7 +337,7 @@ static void *xs_talkv(struct xs_handle *h, enum xsd_sockmsg_type type, ignorepipe.sa_flags = 0; sigaction(SIGPIPE, &ignorepipe, &oldact); - pthread_mutex_lock(&h->request_mutex); + request_mutex_acquire(h); if (!xs_write_all(h->fd, &msg, sizeof(msg))) goto fail; @@ -329,7 +350,7 @@ static void *xs_talkv(struct xs_handle *h, enum xsd_sockmsg_type type, if (!ret) goto fail; - pthread_mutex_unlock(&h->request_mutex); + request_mutex_release(h); sigaction(SIGPIPE, &oldact, NULL); if (msg.type == XS_ERROR) { @@ -350,7 +371,7 @@ static void *xs_talkv(struct xs_handle *h, enum xsd_sockmsg_type type, fail: /* We're in a bad state, so close fd. */ saved_errno = errno; - pthread_mutex_unlock(&h->request_mutex); + request_mutex_release(h); sigaction(SIGPIPE, &oldact, NULL); close_fd: close(h->fd); @@ -593,15 +614,6 @@ char **xs_read_watch(struct xs_handle *h, unsigned int *num) return ret; } -/* Acknowledge watch on node. Watches must be acknowledged before - * any other watches can be read. - * Returns false on failure. - */ -bool xs_acknowledge_watch(struct xs_handle *h, const char *token) -{ - return xs_bool(xs_single(h, XS_WATCH_ACK, token, NULL)); -} - /* Remove a watch on a node. * Returns false on failure (no watch on that node). */ @@ -624,8 +636,18 @@ bool xs_unwatch(struct xs_handle *h, const char *path, const char *token) */ bool xs_transaction_start(struct xs_handle *h) { + bool rc; + pthread_mutex_lock(&h->transaction_mutex); - return xs_bool(xs_single(h, XS_TRANSACTION_START, "", NULL)); + h->transaction_pthread = pthread_self(); + + rc = xs_bool(xs_single(h, XS_TRANSACTION_START, "", NULL)); + if (!rc) { + h->transaction_pthread = -1; + pthread_mutex_unlock(&h->transaction_mutex); + } + + return rc; } /* End a transaction. @@ -645,6 +667,7 @@ bool xs_transaction_end(struct xs_handle *h, bool abort) rc = xs_bool(xs_single(h, XS_TRANSACTION_END, abortstr, NULL)); + h->transaction_pthread = -1; pthread_mutex_unlock(&h->transaction_mutex); return rc; diff --git a/tools/xenstore/xs.h b/tools/xenstore/xs.h index b027a360dd..59dd96d5da 100644 --- a/tools/xenstore/xs.h +++ b/tools/xenstore/xs.h @@ -96,12 +96,6 @@ int xs_fileno(struct xs_handle *h); */ char **xs_read_watch(struct xs_handle *h, unsigned int *num); -/* Acknowledge watch on node. Watches must be acknowledged before - * any other watches can be read. - * Returns false on failure. - */ -bool xs_acknowledge_watch(struct xs_handle *h, const char *token); - /* Remove a watch on a node: implicitly acks any outstanding watch. * Returns false on failure (no watch on that node). */ diff --git a/tools/xenstore/xs_test.c b/tools/xenstore/xs_test.c index 2517ac1560..45b9085153 100644 --- a/tools/xenstore/xs_test.c +++ b/tools/xenstore/xs_test.c @@ -201,7 +201,6 @@ static void __attribute__((noreturn)) usage(void) " watch \n" " watchnoack \n" " waitwatch\n" - " ackwatch \n" " unwatch \n" " close\n" " start \n" @@ -455,8 +454,6 @@ static void do_watch(unsigned int handle, const char *node, const char *token, !streq(vec[XS_WATCH_PATH], node) || !streq(vec[XS_WATCH_TOKEN], token)) failed(handle); - if (!xs_acknowledge_watch(handles[handle], token)) - failed(handle); } } @@ -515,12 +512,6 @@ static void do_waitwatch(unsigned int handle) free(vec); } -static void do_ackwatch(unsigned int handle, const char *token) -{ - if (!xs_acknowledge_watch(handles[handle], token)) - failed(handle); -} - static void do_unwatch(unsigned int handle, const char *node, const char *token) { if (!xs_unwatch(handles[handle], node, token)) @@ -746,8 +737,6 @@ static void do_command(unsigned int default_handle, char *line) do_watch(handle, arg(line, 1), arg(line, 2), false); else if (streq(command, "waitwatch")) do_waitwatch(handle); - else if (streq(command, "ackwatch")) - do_ackwatch(handle, arg(line, 1)); else if (streq(command, "unwatch")) do_unwatch(handle, arg(line, 1), arg(line, 2)); else if (streq(command, "close")) { diff --git a/xen/include/public/io/xs_wire.h b/xen/include/public/io/xs_wire.h index 066c709e30..3c26cc0d80 100644 --- a/xen/include/public/io/xs_wire.h +++ b/xen/include/public/io/xs_wire.h @@ -35,11 +35,9 @@ enum xsd_sockmsg_type XS_READ, XS_GET_PERMS, XS_WATCH, - XS_WATCH_ACK, XS_UNWATCH, XS_TRANSACTION_START, XS_TRANSACTION_END, - XS_OP_READ_ONLY = XS_TRANSACTION_END, XS_INTRODUCE, XS_RELEASE, XS_GET_DOMAIN_PATH, -- 2.30.2